library(tidyverse)
library(arrow)
library(stm)
library(foreach)
library(doParallel)
library(pbmcapply)
library(progress)
#cluster <- makeCluster(20)
#registerDoParallel(cluster) 


set.seed(1683767830)
print("loading data")
#NUM_BATCHES <- 100
newswhip <- arrow::read_parquet("data/processed/filtered_newswhip.parquet") 
newswhip <- newswhip |>
    as_tibble() |>
    filter(estimation_set == 0)

#newswhip <- newswhip 
    #mutate(batch = sample(1:NUM_BATCHES, nrow(newswhip), replace = TRUE))

#newswhip <- newswhip |> slice_sample(prop = 0.05)
load("data/stable_topic_model.RData")
#load("data/multiple_topic_models.RData")
#target_model <- models$runout[[6]]

print("running")
newswhip <- filter(newswhip, publisher %in% c("BuzzFeed", "CNS", "Daily Beast", "Daily Wire", 
                                              "Federalist", "Gateway Pundit", "Infowars", "Intercept",
                                              "MSNBC", "NBC News", "New York Post", "Front Page Magazine", "Politico",
                                              "NewsBusters", "NPR", "RT", "Rush Limbaugh", "Salon", "Vice", "Washington Examiner",
                                              "Washington Post", "Western Journal", "World Net Daily"
                                             ))

newswhip <- group_split(newswhip, publisher)

fit_new <- function(df) {
    publisher <- df$publisher[1]
    proc <- textProcessor(df$headline_and_blurb,
                        metadata = df,
                        stem = TRUE,
                        removestopwords = TRUE)

    temp <- prepDocuments(proc$documents, proc$vocab, proc$meta, lower.thresh = 10)
    newdocs <- alignCorpus(new = temp, old.vocab = target_model$vocab)
    res <- fitNewDocuments(model = target_model,
                              documents = newdocs$documents,
                              newData = newdocs$meta,
                              origData = out$meta,
                              prevalence = ~ publisher + s(day),
                              prevalencePrior = "Covariate")
    #return(res)
    save(res, newdocs, file=glue::glue("data/extra_results_{publisher}.RData"))
    rm(res)
    rm(proc)
    rm(temp)
    rm(newdocs)
    return()

}
#results <- pbmclapply(newswhip, fit_new, mc.cores=4)
#pb <- progress_bar$new(total = length(newswhip))
# for (i in 1:length(newswhip)) {
#     #pb$tick()
#     df <- newswhip[[i]]
#     publisher <- df$publisher[1]
#     res <- fit_new(df)
#     save(res, glue::glue("data/extra_results_{publisher}.RData"))
#     rm(df)
#     rm(res)
#     return()
# }
pbmclapply(newswhip, fit_new, mc.cores = 3)
#save(results, "data/extra_results.RData")
#stopCluster(cluster)